package com.example;

import com.example.entity.User;
import com.fasterxml.jackson.databind.JsonSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.kstream.KStream;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;


import java.util.Properties;

public class KafkaStreamsExample {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.80:9092,192.168.8.80:9093,192.168.8.80:9094");
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://192.168.8.80:8081");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, User> stream = builder.stream("input-topic-hhhh");
        stream.foreach((k,v)-> System.out.println(v));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);


        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }




}